# Anonlink Entity Service API

This tutorial demonstrates directly interacting with the entity service via the REST API. The primary alternative is to use
a library or command line tool such as [`anonlink-client`](https://anonlink-client.readthedocs.io/) which can handle the communication with the anonlink entity service.

### Dependencies

In this tutorial we interact with the REST API using the `requests` Python library. Additionally we use the `clkhash` Python library to define the linkage schema and to encode the PII. The synthetic dataset comes from the `recordlinkage` package. All the dependencies can be installed with pip:

```
pip install requests clkhash recordlinkage
```


### Steps

* Check connection to Anonlink Entity Service
* Synthetic Data generation and encoding
* Create a new linkage project
* Upload the encodings
* Create a run
* Retrieve and analyse results

In [1]:
import json
import os
import time
import requests

from IPython.display import clear_output

## Check Connection

If you are connecting to a custom entity service, change the address here.

In [2]:
server = os.getenv("SERVER", "https://anonlink.easd.data61.xyz")
url = server + "/api/v1/"
print(f'Testing anonlink-entity-service hosted at {url}')

Testing anonlink-entity-service hosted at https://anonlink.easd.data61.xyz/api/v1/


In [3]:
requests.get(url + 'status').json()

{'project_count': 2, 'rate': 9129125, 'status': 'ok'}

## Data preparation

This section won't be explained in great detail as it directly follows the 
[clkhash tutorials](http://clkhash.readthedocs.io/en/latest/).

We encode a synthetic dataset from the `recordlinkage` library using `clkhash`.

In [4]:
from tempfile import NamedTemporaryFile
from recordlinkage.datasets import load_febrl4

In [5]:
dfA, dfB = load_febrl4()

In [6]:
with open('a.csv', 'w') as a_csv:
 dfA.to_csv(a_csv, line_terminator='\n')

with open('b.csv', 'w') as b_csv: 
 dfB.to_csv(b_csv, line_terminator='\n')

## Schema Preparation

The linkage schema must be agreed on by the two parties. A hashing schema instructs `clkhash` how to treat each column for encoding PII into CLKs. A detailed description of the hashing schema can be found in the [clkhash documentation](https://clkhash.readthedocs.io/en/latest/schema.html).

A linkage schema can either be defined as Python code as shown here, or as a JSON file (shown in other tutorials). The importance of each field is controlled by the `k` parameter in the `FieldHashingProperties`.
We ignore the record id and social security id fields so they won't be incorporated into the encoding.

In [7]:
import clkhash
from clkhash.comparators import *
from clkhash.field_formats import *
schema = clkhash.randomnames.NameList.SCHEMA
_missing = MissingValueSpec(sentinel='')
schema.fields = [
 Ignore('rec_id'),
 StringSpec('given_name',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 StringSpec('surname',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 IntegerSpec('street_number',
 FieldHashingProperties(
 NgramComparison(1, positional=True),
 BitsPerTokenStrategy(15),
 missing_value=_missing)),
 StringSpec('address_1',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 StringSpec('address_2',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 StringSpec('suburb',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 IntegerSpec('postcode',
 FieldHashingProperties(
 NgramComparison(1, positional=True),
 BitsPerTokenStrategy(15))),
 StringSpec('state',
 FieldHashingProperties(
 NgramComparison(2),
 BitsPerTokenStrategy(15))),
 IntegerSpec('date_of_birth',
 FieldHashingProperties(
 NgramComparison(1, positional=True),
 BitsPerTokenStrategy(15),
 missing_value=_missing)),
 Ignore('soc_sec_id')
]

## Encoding

Transforming the *raw* personally identity information into CLK encodings following the defined schema. See the [clkhash](https://clkhash.readthedocs.io/) documentation for further details on this.

In [8]:
from clkhash import clk
with open('a.csv') as a_pii:
 hashed_data_a = clk.generate_clk_from_csv(a_pii, 'secret', schema, validate=False)
with open('clks_a.json', 'w') as f:
 json.dump({'clks': hashed_data_a}, f)

with open('b.csv') as b_pii:
 hashed_data_b = clk.generate_clk_from_csv(b_pii, 'secret', schema, validate=False)
with open('clks_b.json', 'w') as f:
 json.dump({'clks': hashed_data_b}, f)

generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 2.84kclk/s, mean=643, std=45.7]
generating CLKs: 100%|██████████| 5.00k/5.00k [00:01<00:00, 3.16kclk/s, mean=631, std=52.9]


## Create Linkage Project

The analyst carrying out the linkage starts by creating a linkage project of the desired output type with the Entity Service.


In [9]:
project_spec = {
 "schema": {},
 "result_type": "groups",
 "number_parties": 2,
 "name": "API Tutorial Test"
}
credentials = requests.post(url + 'projects', json=project_spec).json()

project_id = credentials['project_id']
a_token, b_token = credentials['update_tokens']
credentials

{'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',
 'result_token': '7878f100f21bfb041f2b7411ac8eba7246fd43f08586c8c7',
 'update_tokens': ['81984628f5e0178cccfae34207581836f8cc1e39f92f9532',
 '3b13b1528d084baa51c2d25f715b3701cccb3f292c561eb9']}

The server returns a `project_id`, a `result_token` and a set of `update_tokens`, one for each data provider.
- The `project_id` references the project uniquely on the server.
- The `result_token` authorises project API requests, i.e., access to the result of the linkage.
- The `update_tokens` authorise the data upload. There is one `update_token` for each data provider, and each token can only be used once.

**Note:** the analyst will need to pass on the `project_id` (the
id of the linkage project) and one of the `update_tokens` to
each data provider.

The `result_token` can also be used to carry out project API requests:

In [10]:
requests.get(url + 'projects/{}'.format(project_id), 
 headers={"Authorization": credentials['result_token']}).json()

{'error': False,
 'name': 'API Tutorial Test',
 'notes': '',
 'number_parties': 2,
 'parties_contributed': 0,
 'project_id': '277ecc511e18dc10382b77f711dfbcf57f028405d57e650c',
 'result_type': 'groups',
 'schema': {},
 'uses_blocking': False}

Now the two clients can upload their data providing the appropriate *upload tokens*.

## CLK Upload
there are currently two different ways of uploading CLKs to the entity server.

### Method 1: Direct Upload
The 'clks' endpoint accepts CLKs in both json and binary format. However, this method is not recommended for large datasets, as uploads can not be resumed and might time out.

In [None]:
a_response = requests.post(
 '{}projects/{}/clks'.format(url, project_id),
 json={'clks': hashed_data_a},
 headers={"Authorization": a_token}
).json()

In [None]:
b_response = requests.post(
 '{}projects/{}/clks'.format(url, project_id),
 json={'clks': hashed_data_b},
 headers={"Authorization": b_token}
).json()

### Method 2. Upload to object store. 
The entity service can be deployed with an object store. This object store can be used by the data providers to upload their CLKs.
First, the data provider have to request a set of temporary credentials which authorise the upload to the object store. The returned Temporary Object Store Credentials can be used with any S3 compatible client. For example by using boto3 in Python. The returned credentials are restricted to allow only uploading data to a particular path in a particular bucket for a finite period (defaulting to 12 hours).
After the client uploaded the data, he informs the entity service.

Note this feature may be disabled by the administrator, in this case the endpoint will return a 500 server error.

In [11]:
from minio import Minio

upload_response = requests.get(
 url + 'projects/{}/authorize-external-upload'.format(project_id),
 headers={'Authorization': a_token},
).json()

upload_credentials = upload_response['credentials']
upload_info = upload_response['upload']

# Use Minio python client to upload data
mc = Minio(
 upload_info['endpoint'],
 access_key=upload_credentials['AccessKeyId'],
 secret_key=upload_credentials['SecretAccessKey'],
 session_token=upload_credentials['SessionToken'],
 region='us-east-1',
 secure=upload_info['secure']
 )

etag = mc.fput_object(
 upload_info['bucket'],
 upload_info['path'] + "/clks_a.json",
 'clks_a.json',
 metadata={
 "hash-count": 5000,
 "hash-size": 128
 })

# Should be able to notify the service that we've uploaded data
res = requests.post(url + f"projects/{project_id}/clks",
 headers={'Authorization': a_token},
 json={
 'encodings': {
 'file': {
 'bucket': upload_info['bucket'],
 'path': upload_info['path'] + "/clks_a.json",
 }
 }
 })
print(f'Upload party A: {"OK" if res.status_code == 201 else "ERROR"}')
#party B:
upload_response = requests.get(
 url + 'projects/{}/authorize-external-upload'.format(project_id),
 headers={'Authorization': b_token},
).json()

upload_credentials = upload_response['credentials']
upload_info = upload_response['upload']

# Use Minio python client to upload data
mc = Minio(
 upload_info['endpoint'],
 access_key=upload_credentials['AccessKeyId'],
 secret_key=upload_credentials['SecretAccessKey'],
 session_token=upload_credentials['SessionToken'],
 region='us-east-1',
 secure=upload_info['secure']
 )

etag = mc.fput_object(
 upload_info['bucket'],
 upload_info['path'] + "/clks_b.json",
 'clks_b.json',
 metadata={
 "hash-count": 5000,
 "hash-size": 128
 })

# Should be able to notify the service that we've uploaded data
res = requests.post(url + f"projects/{project_id}/clks",
 headers={'Authorization': b_token},
 json={
 'encodings': {
 'file': {
 'bucket': upload_info['bucket'],
 'path': upload_info['path'] + "/clks_b.json",
 }
 }
 })
print(f'Upload party B: {"OK" if res.status_code == 201 else "ERROR"}')

Upload party A: OK
Upload party B: OK


Every upload gets a receipt token. In some operating modes this receipt is required to access the results.

## Create a run

Now the project has been created and the CLK encodings have been uploaded we can carry out some privacy preserving record linkage. The same encoding data can be linked using different threshold values by creating **runs**.

In [12]:
run_response = requests.post(
 "{}projects/{}/runs".format(url, project_id),
 headers={"Authorization": credentials['result_token']},
 json={
 'threshold': 0.80,
 'name': "Tutorial Run #1"
 }
).json()

In [13]:
run_id = run_response['run_id']

## Run Status

In [14]:
requests.get(
 '{}projects/{}/runs/{}/status'.format(url, project_id, run_id),
 headers={"Authorization": credentials['result_token']}
 ).json()

{'current_stage': {'description': 'waiting for CLKs',
 'number': 1,
 'progress': {'absolute': 2,
 'description': 'number of parties already contributed',
 'relative': 1.0}},
 'stages': 3,
 'state': 'created',
 'time_added': '2020-06-15T05:41:44.177808',
 'time_started': None}

## Results

Now after some delay (depending on the size) we can fetch the results. This can of course be done by directly polling the REST API using `requests`, however for simplicity we will just use the watch_run_status function provided in `anonlinkclient.rest_client`.

> Note the `server` is provided rather than `url`.

In [15]:
from anonlinkclient.rest_client import RestClient, format_run_status
rest_client = RestClient(server)

for update in rest_client.watch_run_status(project_id, run_id, credentials['result_token'], timeout=300):
 clear_output(wait=True)
 print(format_run_status(update))

State: completed
Stage (3/3): compute output


In [16]:
data = json.loads(rest_client.run_get_result_text(
 project_id, 
 run_id, 
 credentials['result_token']))

This result is the 1-1 mapping between rows that were more similar than the given threshold.

In [17]:
for i in range(10):
 ((_, a_index), (_, b_index)) = sorted(data['groups'][i])
 print("a[{}] maps to b[{}]".format(a_index, b_index))
print("...")

a[174] maps to b[4485]
a[2570] maps to b[3737]
a[1920] maps to b[4157]
a[420] maps to b[4323]
a[136] maps to b[1416]
a[3090] maps to b[4797]
a[2940] maps to b[3663]
a[2228] maps to b[1095]
a[2623] maps to b[3447]
a[672] maps to b[2795]
...


In this dataset there are 5000 records in common. With the chosen threshold and schema we currently retrieve:

In [18]:
len(data['groups'])

4842

## Cleanup

If you want you can delete the run and project from the anonlink-entity-service.

In [19]:
requests.delete("{}/projects/{}".format(url, project_id), headers={"Authorization": credentials['result_token']})

